package com.xiaofan.apitest.source

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import java.util.Properties

/**
 * 查询topic: bin/kafka-topics.sh --list --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181
 * <p>
 * 创建topic： bin/kafka-topics.sh --create --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --replication-factor 2 --partitions 3 --topic sensor
 * <p>
 * 删除topic: bin/kafka-topics.sh --delete --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --topic sensor
 * <p>
 * 生产数据：bin/kafka-console-producer.sh --broker-list 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --topic sensor
 * <p>
 * 消费数据： bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --from-beginning --topic sink_test
 */
object KafkaSourceTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091")
    properties.setProperty("group.id", "consumer-group")

    val result: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
    result.print()

    env.execute("kafka source test")
  }
}
